/**
 *
 * Copyright (C) 2013 陕西威尔伯乐信息技术有限公司
 *
 * @className:com.wellbole.sms.client.service.StatusReportFetcherThreadService
 * @description: 状态报告接受线程服务，该服务定期主动获取状态报告，并回调客户提供的状态报告回调接口进行状态报告处理
 *
 * @version:v1.0.0
 * @author:李飞
 *
 * Modification History:
 * Date         Author      Version     Description
 * -----------------------------------------------------------------
 * 2013年7月1日      李飞                       v1.0.0        create
 *
 */
package com.wellbole.sms.client.service;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import com.wellbole.sms.client.StatusReportFetcher;
import com.wellbole.sms.client.callback.StatusReportMessageFetchCallback;
import com.wellbole.sms.client.impl.DefaultStatusReportMessageReceivedCallback;
import com.wellbole.sms.client.message.StatusReportMessage;
import com.wellbole.sms.client.result.StatusReportFetchResult;

/**
 *
 * 状态报告接受线程服务，该服务定期主动获取状态报告，并回调客户提供的状态报告回调接口进行状态报告处理
 * @author 李飞
 *
 */
public class StatusReportFetcherThreadService implements Runnable, Service {

	/**
	 * 需要进行状态报告检查的下行消息队列。
	 */
	private Queue<String> statusReportQueue = new ConcurrentLinkedQueue<String>();


	/**
	 * 记录状态报告检车结果的map对象。
	 */
	private Map<String,StatusReport> statusReportMap = new ConcurrentHashMap<String,StatusReport>();


	/**
	 * 内部类，用于保存状态报告的请求次数和状态，只有状态发生变化时，才回调。
	 * @author dafei
	 *
	 */
	private static class StatusReport{
		/**
		 * 状态
		 */
		private int status = 0;
		/**
		 * 请求次数
		 */
		private int tryTimes = 0;

		/**
		 * @return the status
		 */
		public final int getStatus() {
			return status;
		}
		/**
		 * @param status the status to set
		 */
		public final void setStatus(int status) {
			this.status = status;
		}
		/**
		 * @return the tryTimes
		 */
		public final int getTryTimes() {
			return tryTimes;
		}
		/**
		 * @param tryTimes the tryTimes to set
		 */
		public final void setTryTimes(int tryTimes) {
			this.tryTimes = tryTimes;
		}
	}


	/**
	 * 默认检查周期(3s)
	 */
	private final static int DEFAULT_PERIOD = 30;

	/**
	 * 最大重试次数(20)
	 */
	private final static int MAX_RETRY_TIMES = 20;


	/**
	 * 日志
	 */
	private static final Logger logger = LogManager.getLogger(Thread.currentThread().getStackTrace()[2].getClassName());

	/**
	 * 状态报告接受客户端
	 */
	private StatusReportFetcher statusReportFetcher = null;

	/**
	 * 状态报告处理回调接口
	 */
	private StatusReportMessageFetchCallback statusReportMessageFetchCallback = null;


	/**
	 * 上行短信检查周期（默认10秒种检查一次），单位秒
	 */
	private int period = DEFAULT_PERIOD;

	/**
	 * 调度线程池
	 */
	private ScheduledExecutorService scheduledExecutorService = Executors
			.newSingleThreadScheduledExecutor();

	/**
	 * 将msgid加入到状态报告检查队列里
	 * @param msgid
	 */
	public void AddToStatusReportQueue(String msgid){
		if(!this.statusReportMap.containsKey(msgid)){
			this.statusReportQueue.offer(msgid);
			this.statusReportMap.put(msgid, new StatusReport());
		}
	}
	/**
	 * 开始上行短信接受服务。
	 */
	public boolean start() {
		// 变量检查
		if (null == this.statusReportFetcher) {
			logger.error("statusReportReceiver is null!");
			return false;
		}
		if (null == this.statusReportMessageFetchCallback) {
			logger.warn("notification is null!set it to DefaultInboundMessageCallback");
			this.statusReportMessageFetchCallback = new DefaultStatusReportMessageReceivedCallback();

		}
		if (period < DEFAULT_PERIOD) {
			logger.warn("period is too short, set it to default value ："
					+ DEFAULT_PERIOD);
			period = DEFAULT_PERIOD;
		}
		scheduledExecutorService.scheduleAtFixedRate(this, 3, period,
				TimeUnit.SECONDS);
		logger.info("SmsReceiveThreadService is started!");
		return true;
	}

	/**
	 * 停止服务
	 */
	public void stop() {
		scheduledExecutorService.shutdown();
		logger.info("SmsReceiveThreadService is stoped!");
	}

	public void run() {
		//循环从队列里取出需要进行状态报告检查的消息id
		List<String> retryList = new ArrayList<String>();
		//为了避免长期占用，最多处理1000条就返回（大概用时20秒）
		int index = 0;
		while(index < 1000){
			index++;
			String msgid = this.statusReportQueue.poll();
			if(null == msgid){
				//队列无记录，直接返回。
				break;
			}
			StatusReport sr = this.statusReportMap.get(msgid);
			//超过最大重试次数，从map中移除，直接退出。
			if(sr.getTryTimes() > MAX_RETRY_TIMES){
				this.statusReportMap.remove(msgid);
				continue;
			}
			//重试次数加一。
			sr.setTryTimes(sr.getTryTimes() + 1);

			// 取得状态报告
			StatusReportFetchResult statusReportResult = this.statusReportFetcher.fetch(msgid);

			// 发生错误，记录错误信息
			if (statusReportResult.hasError()) {
				logger.error(statusReportResult.getErrorMessage());
				continue;
			}
			if (statusReportResult.hasData()) {
				// 回调处理上行短信
				StatusReportMessage srm = statusReportResult.getStatusReportMessage();
				if(srm.getStatusCode() != sr.getStatus()){
					//状态发生变化了，回调。
					this.statusReportMessageFetchCallback.onFetch(srm);
					if(srm.getStatusCode() > 1){
						//到了终极状态了。不可能再变了。移除。
						this.statusReportMap.remove(msgid);
					}else{
						//还有进一步变化的可能。，重新放入队列。
						retryList.add(msgid);
						//进入下一个状态，重试次数清零。
						sr.setStatus(srm.getStatusCode());
						sr.setTryTimes(0);
					}
				}else{
					//状态没有发生变化，重新放入队列，前面已经增加了重试次数。
					retryList.add(msgid);
				}
			}
		}
		if(retryList.size() > 0){
			this.statusReportQueue.addAll(retryList);
		}
	}


	/**
	 * @param statusReportFetcher 状态报告接收器
	 */
	public final void setStatusReportFetcher(
			StatusReportFetcher statusReportFetcher) {
		this.statusReportFetcher = statusReportFetcher;
	}

	/**
	 * @param statusReportMessageFetchCallback 状态报告回调函数。
	 */
	public final void setStatusReportMessageFetchCallback(
			StatusReportMessageFetchCallback statusReportMessageFetchCallback) {
		this.statusReportMessageFetchCallback = statusReportMessageFetchCallback;
	}

	/**
	 * 设定查询周期，每间隔多少秒查询一次服务器，默认20秒一次。
	 *
	 * @param period
	 *            查询周期，单位秒
	 */
	public final void setPeriod(int period) {
		this.period = period;
	}
}
